/* XXL: The eXtensible and fleXible Library for data processing

Copyright (C) 2000-2011 Prof. Dr. Bernhard Seeger
                        Head of the Database Research Group
                        Department of Mathematics and Computer Science
                        University of Marburg
                        Germany

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library;  If not, see <http://www.gnu.org/licenses/>. 

    http://code.google.com/p/xxl/

*/

package xxl.core.math.functions;

import xxl.core.collections.bags.Bag;
import xxl.core.predicates.Equal;
import xxl.core.predicates.Predicate;
import xxl.core.predicates.RightBind;

/**
 * This class realizes an aggregation-function performing an early duplicate
 * elimination during an aggregate operation. <br>
 * Using this function in an Aggregator offers the ability
 * to perform SQL statements like: <br>
 * <pre>
 * 	SELECT DISTINCT SUM(A), DISTINCT AVG(B)
 * 	FROM Table1
 * 	WHERE ...
 * </pre>
 *
 * A DistinctAggregateFunction decorates a given aggregation-function
 * and and uses a bag to realize the duplicate elimination.
 * That means, the elements of the input cursor are inserted in the bag
 * and the specified predicate determines during the execution
 * of the query() method, if an element in the bag matches
 * with the next element, the aggregate should be computed
 * with. If no predicate has been specified in a constructor,
 * the predicate {@link xxl.core.predicates.Equal} is used as a
 * default. <br>
 * So the next aggregate, i.e.
 * <code>function.invoke(aggregate, next)</code>,
 * is only computed, if the bag does not contain the
 * next element the aggregate function will be invoked with.
 *
 * @param <P> the type of the aggregated values.
 * @param <A> the return type of the function, i.e., the type of the aggregate.
 */
public class DistinctAggregationFunction<P, A> extends DecoratorAggregationFunction<P, A> {

	/**
	 * The bag containing unique elements of the input cursor.
	 */
	protected Bag<P> bag;

	/**
	 * The predicate used to determine, if an element of the
	 * bag and the next element, the incremental aggregate
	 * should be computed with, are equal.
	 */
	protected Predicate<? super P> predicate;

	/**
	 * Creates a new DistinctAggregateFunction.
	 *
	 * @param function an aggregation-function computing an incremental aggregate, e.g. SUM, AVG, ...
	 * @param bag a bag used to store the input cursor's elements.
	 * @param predicate binary predicate determining if an element of the bag and the next
	 * 		element, the incremental aggregate should be computed with, are equal
	 */
	public DistinctAggregationFunction(AggregationFunction<? super P, A> function, Bag<P> bag, Predicate<? super P> predicate) {
		super(function);
		this.bag = bag;
		this.predicate = predicate;
	}

	/**
	 * Creates a new DistinctAggregateFunction.
	 * Uses a default instance of the predicate {@link xxl.core.predicates.Equal}.
	 *
	 * @param function an aggregation-function computing an incremental aggregate, e.g. SUM, AVG, ...
	 * @param bag a bag used to store the input cursor's elements
	 */
	public DistinctAggregationFunction(AggregationFunction<? super P, A> function, Bag<P> bag) {
		this(function, bag, Equal.DEFAULT_INSTANCE);
	}

	/**
	 * Creates a new DistinctAggregateFunction.
	 * Uses a bag generated by Bag.FACTORY_METHOD.invoke() and
	 * a default instance of the predicate {@link xxl.core.predicates.Equal}.
	 *
	 * @param function an aggregation-function computing an incremental aggregate, e.g. SUM, AVG, ...
	 */
	public DistinctAggregationFunction(AggregationFunction<? super P, A> function) {
		this(function, (Bag)Bag.FACTORY_METHOD.invoke(), Equal.DEFAULT_INSTANCE);
	}

	/**
	 * Overrides the invoke method with two parameters used by XXL's incremental
	 * Aggregators in the packages xxl.core.cursors and xxl.core.relational. <br>
	 * If the size of the bag is greater than 0, the bag is queried
	 * in order to check if the <tt>next</tt> element is already contained in
	 * the bag. If this is the case, the last aggregate is returned.
	 * Otherwise the <tt>next</tt> element is inserted in the bag and
	 * the aggregate is incrementally computed as usual.
	 *
	 * @param aggregate aggregate
	 * @param next next element of the input cursor
	 * @return last aggregate, if <tt>next</tt> is contained in the bag,
	 * 		otherwise the new computed aggregate is returned
	 */
	public A invoke(A aggregate, P next) {
		if (bag.size() > 0)
 			if (bag.query(new RightBind<P>(predicate, next)).hasNext())
 				return aggregate;
		bag.insert(next);
		return super.invoke(aggregate, next);
	}

}
